Skip to content

feat: add fetch_sample_values_from_database method#332

Merged
AnshumanX304 merged 2 commits into
datachecks:mainfrom
AnshumanX304:sample_val
Sep 26, 2025
Merged

feat: add fetch_sample_values_from_database method#332
AnshumanX304 merged 2 commits into
datachecks:mainfrom
AnshumanX304:sample_val

Conversation

@AnshumanX304

@AnshumanX304 AnshumanX304 commented Sep 26, 2025

Copy link
Copy Markdown
Contributor

Fixes/Implements

Description

Summary Goes here.

Type of change

Delete irrelevant options.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • Locally Tested
  • Needs Testing From Production

Summary by CodeRabbit

  • New Features
    • Preview sample rows for selected columns from database tables (Postgres and Sybase).
    • Specify a row limit to quickly inspect data without running full queries.
    • Input validation now requires at least one column be selected before fetching samples, improving reliability and user feedback.

@coderabbitai

coderabbitai Bot commented Sep 26, 2025

Copy link
Copy Markdown

Walkthrough

Adds a new public method fetch_sample_values_from_database to PostgresDataSource and SybaseDataSource that builds and executes a SELECT for specified columns with a configurable limit and returns fetched rows; Postgres validates non-empty column list and resolves fully qualified table names.

Changes

Cohort / File(s) Summary
Database sampling methods
dcs_core/integrations/databases/postgres.py, dcs_core/integrations/databases/sybase.py
Added fetch_sample_values_from_database(table_name: str, column_names: list[str], limit: int = 5) to both classes. Postgres: validates non-empty column_names, resolves fully qualified table name, builds SELECT ... LIMIT n, executes and returns List[Tuple]. Sybase: qualifies table name, builds SELECT TOP n ..., executes and returns list[Tuple].

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Caller
  participant P as PostgresDataSource
  participant S as SybaseDataSource
  participant DB as Database

  rect rgba(200,230,255,0.25)
    note over C,P: Postgres sampling flow (new)
    C->>P: fetch_sample_values_from_database(table, columns, limit)
    P->>P: Validate columns non-empty (raise ValueError if empty)
    P->>P: Resolve qualified table name
    P->>P: Build SELECT columns FROM table LIMIT n
    P->>DB: Execute query
    DB-->>P: rows (list of tuples)
    P-->>C: rows
  end

  rect rgba(220,255,220,0.25)
    note over C,S: Sybase sampling flow (new)
    C->>S: fetch_sample_values_from_database(table, columns, limit)
    S->>S: Resolve/qualify table name
    S->>S: Build SELECT TOP n columns FROM table
    S->>DB: Execute query
    DB-->>S: rows (list of tuples)
    S-->>C: rows
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

I twitch my whiskers, query-bound,
Two tunnels dug where rows are found—
Postgres checks the columns first,
Sybase leaps to TOP with burst.
Sampled rows in gentle hop, I hum and hop and never stop. 🐇

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description contains only the template with placeholder text and lacks a summary of changes, a selected type of change, and detailed testing information, so it does not meet the repository’s required structure. It fails to provide context or implementation details, making it impossible to understand what this PR accomplishes and which category it falls under. Please replace the placeholder text with a clear summary of the new method’s functionality, select the appropriate type of change checkbox, and include details on how the feature was tested to satisfy the repository’s PR template requirements.
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title clearly indicates that a new feature method named fetch_sample_values_from_database is being added and directly reflects the primary change in the pull request. It is short, specific, and free of any vague terms or unnecessary information. Therefore, it meets the criteria for a clear and concise pull request title.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@subhankarb

Copy link
Copy Markdown
Contributor

@AnshumanX304
We need the sample value for all the databases

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
dcs_core/integrations/databases/postgres.py (1)

288-312: Consider aligning the public API with Sybase for consistency

Postgres returns List[Tuple] while Sybase initially declared Tuple[List, Optional[List[str]]]. After fixing Sybase, ensure both return the same type (prefer List[Tuple] as implemented here) to simplify multi-DB callers.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e6e5a6 and a10d0ff.

📒 Files selected for processing (2)
  • dcs_core/integrations/databases/postgres.py (1 hunks)
  • dcs_core/integrations/databases/sybase.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
dcs_core/integrations/databases/postgres.py (1)
dcs_core/core/datasource/sql_datasource.py (3)
  • qualified_table_name (130-138)
  • quote_column (148-154)
  • fetchall (117-120)
dcs_core/integrations/databases/sybase.py (3)
dcs_core/integrations/databases/postgres.py (3)
  • fetch_sample_values_from_database (288-311)
  • qualified_table_name (56-64)
  • quote_column (66-72)
dcs_core/integrations/databases/mssql.py (2)
  • qualified_table_name (110-118)
  • quote_column (120-126)
dcs_core/integrations/databases/spark_df.py (2)
  • cursor (80-81)
  • fetchmany (45-53)
🪛 Ruff (0.13.1)
dcs_core/integrations/databases/postgres.py

305-305: Avoid specifying long messages outside the exception class

(TRY003)


308-308: Possible SQL injection vector through string-based query construction

(S608)

dcs_core/integrations/databases/sybase.py

614-614: Possible SQL injection vector through string-based query construction

(S608)

Comment on lines +304 to +311
if not column_names:
raise ValueError("At least one column name must be provided")

columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT {columns} FROM {table_name} LIMIT {limit}"
result = self.connection.execute(text(query))
rows = result.fetchall()
return rows

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate limit and bind it as a parameter; escape identifiers

  • S608: limit is injected directly into the SQL. Bind it via SQLAlchemy and validate it’s a positive int.
  • Escape embedded double quotes in identifiers to avoid breaking out of quoted names.

Apply:

         if not column_names:
             raise ValueError("At least one column name must be provided")
+        if not isinstance(limit, int) or limit <= 0:
+            raise ValueError("limit must be a positive integer")
 
         columns = ", ".join([self.quote_column(col) for col in column_names])
-        query = f"SELECT {columns} FROM {table_name} LIMIT {limit}"
-        result = self.connection.execute(text(query))
+        stmt = text(f"SELECT {columns} FROM {table_name} LIMIT :limit")
+        result = self.connection.execute(stmt, {"limit": limit})
         rows = result.fetchall()
         return rows

Outside this hunk, harden identifier quoting:

 def qualified_table_name(self, table_name: str) -> str:
-        if self.schema_name:
-            return f'"{self.schema_name}"."{table_name}"'
-        return f'"{table_name}"'
+        if self.schema_name:
+            schema = self.schema_name.replace('"', '""')
+            table = table_name.replace('"', '""')
+            return f'"{schema}"."{table}"'
+        table = table_name.replace('"', '""')
+        return f'"{table}"'
 
 def quote_column(self, column: str) -> str:
-        return f'"{column}"'
+        safe = column.replace('"', '""')
+        return f'"{safe}"'

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.13.1)

305-305: Avoid specifying long messages outside the exception class

(TRY003)


308-308: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
In dcs_core/integrations/databases/postgres.py around lines 304 to 311, the code
injects the LIMIT value and identifiers directly into the SQL; validate that
limit is a positive integer and bind it as a SQLAlchemy parameter (e.g., use a
:limit bind) instead of string interpolation, and ensure identifiers are safely
quoted by escaping embedded double quotes inside quote_column (replace any "
with "" before wrapping in quotes) and apply the same quoting to table_name (or
use a centralized quote_identifier helper) so no unescaped quotes can break the
query.

Comment on lines +606 to +620
def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> Tuple[List, Optional[List[str]]]:
table_name = self.qualified_table_name(table_name)
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"

cursor = self.connection.cursor()
cursor.execute(query)
rows = cursor.fetchmany(limit)
return rows

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix return-type mismatch, add validation, and reuse existing fetch_rows

  • The annotation says Tuple[List, Optional[List[str]]] but the function returns only rows. This will surprise callers and violates the signature.
  • Missing validation for empty column_names and non-positive limit leads to runtime errors and potential misuse.
  • Reuse fetch_rows for consistency and to avoid duplicate cursor logic.

Apply:

-    def fetch_sample_values_from_database(
-        self,
-        table_name: str,
-        column_names: list[str],
-        limit: int = 5,
-    ) -> Tuple[List, Optional[List[str]]]:
-        table_name = self.qualified_table_name(table_name)
-        columns = ", ".join([self.quote_column(col) for col in column_names])
-        query = f"SELECT TOP {limit} {columns} FROM {table_name}"
-
-        cursor = self.connection.cursor()
-        cursor.execute(query)
-        rows = cursor.fetchmany(limit)
-        return rows
+    def fetch_sample_values_from_database(
+        self,
+        table_name: str,
+        column_names: list[str],
+        limit: int = 5,
+    ) -> List[Tuple]:
+        table_name = self.qualified_table_name(table_name)
+        if not column_names:
+            raise ValueError("At least one column name must be provided")
+        if not isinstance(limit, int) or limit <= 0:
+            raise ValueError("limit must be a positive integer")
+        columns = ", ".join([self.quote_column(col) for col in column_names])
+        base_query = f"SELECT {columns} FROM {table_name}"
+        rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False)
+        return rows

Additionally, escape identifier delimiters to harden against injection via crafted names:

Outside this hunk, update quote_column:

 def quote_column(self, column: str) -> str:
-        return f"[{column}]"
+        safe = column.replace("]", "]]")
+        return f"[{safe}]"

And qualified_table_name:

 def qualified_table_name(self, table_name: str) -> str:
-        if self.schema_name:
-            return f"[{self.schema_name}].[{table_name}]"
-        return f"[{table_name}]"
+        if self.schema_name:
+            schema_safe = self.schema_name.replace("]", "]]")
+            table_safe = table_name.replace("]", "]]")
+            return f"[{schema_safe}].[{table_safe}]"
+        table_safe = table_name.replace("]", "]]")
+        return f"[{table_safe}]"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> Tuple[List, Optional[List[str]]]:
table_name = self.qualified_table_name(table_name)
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"
cursor = self.connection.cursor()
cursor.execute(query)
rows = cursor.fetchmany(limit)
return rows
def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> List[Tuple]:
table_name = self.qualified_table_name(table_name)
if not column_names:
raise ValueError("At least one column name must be provided")
if not isinstance(limit, int) or limit <= 0:
raise ValueError("limit must be a positive integer")
columns = ", ".join([self.quote_column(col) for col in column_names])
base_query = f"SELECT {columns} FROM {table_name}"
rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False)
return rows
Suggested change
def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> Tuple[List, Optional[List[str]]]:
table_name = self.qualified_table_name(table_name)
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"
cursor = self.connection.cursor()
cursor.execute(query)
rows = cursor.fetchmany(limit)
return rows
def quote_column(self, column: str) -> str:
safe = column.replace("]", "]]")
return f"[{safe}]"
Suggested change
def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> Tuple[List, Optional[List[str]]]:
table_name = self.qualified_table_name(table_name)
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"
cursor = self.connection.cursor()
cursor.execute(query)
rows = cursor.fetchmany(limit)
return rows
def qualified_table_name(self, table_name: str) -> str:
if self.schema_name:
schema_safe = self.schema_name.replace("]", "]]")
table_safe = table_name.replace("]", "]]")
return f"[{schema_safe}].[{table_safe}]"
table_safe = table_name.replace("]", "]]")
return f"[{table_safe}]"
🧰 Tools
🪛 Ruff (0.13.1)

614-614: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
In dcs_core/integrations/databases/sybase.py around lines 606-620, the function
fetch_sample_values_from_database currently returns only rows despite being
annotated as Tuple[List, Optional[List[str]]]; it also misses validation for
empty column_names and non-positive limit and reimplements cursor logic. Change
the function to validate that column_names is non-empty and limit is a positive
integer (raise ValueError on invalid input), build the SELECT TOP {limit} query
using quoted columns and the qualified table name, then call and return the
existing self.fetch_rows(query, limit) (so both rows and optional column names
are returned and cursor handling is reused). Additionally, update quote_column
and qualified_table_name elsewhere to escape identifier delimiters properly to
harden against crafted names (ensure delimiters inside identifiers are
doubled/escaped according to Sybase rules).

Comment on lines +613 to +615
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Mitigate SQL injection and ensure consistent API across data sources

  • S608: string-built SQL; although identifiers are quoted, limit is injected as-is and identifiers aren’t escaped. The above refactor addresses this by escaping bracket delimiters and validating limit. Please also confirm the return type now matches Postgres’ method (List[Tuple]) for cross-DB consistency.

Run to find usages and ensure callers expect a List[Tuple] from both Postgres and Sybase:


🏁 Script executed:

#!/bin/bash
# Definitions
rg -nP '^\s*def\s+fetch_sample_values_from_database\b' -C2 --type=py
# Call-sites (exclude defs)
rg -nP '(?<!def\s+)\bfetch_sample_values_from_database\s*\(' -C2 --type=py

Length of output: 767


🏁 Script executed:

#!/bin/bash
# Search for all call-sites of fetch_sample_values_from_database
rg -n "fetch_sample_values_from_database\(" -C2 --type=py

Length of output: 698


🏁 Script executed:

#!/bin/bash
# Inspect SQL construction in Sybase integration
rg -n "SELECT TOP" -C5 --type=py dcs_core/integrations/databases/sybase.py

# Show fetch_sample_values_from_database signature and return in Postgres
rg -nP "def fetch_sample_values_from_database" -C5 --type=py dcs_core/integrations/databases/postgres.py

# Show fetch_sample_values_from_database signature and return in Sybase
rg -nP "def fetch_sample_values_from_database" -C5 --type=py dcs_core/integrations/databases/sybase.py

Length of output: 2113


Validate limit and align Sybase return type with Postgres

  • Coerce/validate limit to an int before interpolating into the SQL (dcs_core/integrations/databases/sybase.py:613-614) to eliminate injection risk.
  • Change the signature from -> Tuple[List, Optional[List[str]]] to -> List[Tuple] and return only rows, matching Postgres.
🧰 Tools
🪛 Ruff (0.13.1)

614-614: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
In dcs_core/integrations/databases/sybase.py around lines 613-615, coerce and
validate the incoming limit to an int (e.g., try: limit = int(limit); raise a
ValueError or use a safe default if conversion fails, and ensure limit is
non-negative) before interpolating it into the SQL string to avoid injection,
and then change the function signature and return value to match Postgres by
returning only rows with type List[Tuple] (remove the Optional[List[str]] from
the return and return rows only); update any related type hints/imports and
callers accordingly.

@AnshumanX304 AnshumanX304 merged commit f746eb2 into datachecks:main Sep 26, 2025
4 of 7 checks passed

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
dcs_core/integrations/databases/sybase.py (1)

210-217: Enhance identifier escaping to prevent injection via crafted names

The current quote_column and qualified_table_name methods don't escape square bracket delimiters within identifiers, which could potentially allow injection through crafted column or table names containing ] characters.

Apply these diffs to properly escape identifiers:

 def quote_column(self, column: str) -> str:
     """
     Quote the column name
     :param column: name of the column
     :return: quoted column name
     """
-    return f"[{column}]"
+    safe = column.replace("]", "]]")
+    return f"[{safe}]"
 def qualified_table_name(self, table_name: str) -> str:
     """
     Get the qualified table name
     :param table_name: name of the table
     :return: qualified table name
     """
     if self.schema_name:
-        return f"[{self.schema_name}].[{table_name}]"
-    return f"[{table_name}]"
+        schema_safe = self.schema_name.replace("]", "]]")
+        table_safe = table_name.replace("]", "]]")
+        return f"[{schema_safe}].[{table_safe}]"
+    table_safe = table_name.replace("]", "]]")
+    return f"[{table_safe}]"

This follows SQL Server/Sybase identifier escaping rules where square brackets within identifiers must be doubled to be treated literally.

🧹 Nitpick comments (1)
dcs_core/integrations/databases/sybase.py (1)

612-620: Consider reusing existing fetch_rows method for consistency

The method duplicates cursor logic that already exists in the fetch_rows method. This could lead to maintenance issues and inconsistent behavior across the codebase.

Apply this diff to leverage the existing fetch_rows method:

-        cursor = self.connection.cursor()
-        cursor.execute(query)
-        rows = cursor.fetchmany(limit)
-        return rows
+        base_query = f"SELECT {columns} FROM {table_name}"
+        rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False)
+        return rows

This approach:

  • Reuses existing, tested cursor handling logic
  • Maintains consistency with other database methods
  • Reduces code duplication
  • Leverages the existing TOP clause handling in fetch_rows
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a10d0ff and 6a20b56.

📒 Files selected for processing (1)
  • dcs_core/integrations/databases/sybase.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
dcs_core/integrations/databases/sybase.py (3)
dcs_core/integrations/databases/postgres.py (3)
  • fetch_sample_values_from_database (288-311)
  • qualified_table_name (56-64)
  • quote_column (66-72)
dcs_core/integrations/databases/mssql.py (2)
  • qualified_table_name (110-118)
  • quote_column (120-126)
dcs_core/integrations/databases/spark_df.py (2)
  • cursor (80-81)
  • fetchmany (45-53)
🪛 Ruff (0.13.1)
dcs_core/integrations/databases/sybase.py

614-614: Avoid specifying long messages outside the exception class

(TRY003)


616-616: Possible SQL injection vector through string-based query construction

(S608)

🔇 Additional comments (1)
dcs_core/integrations/databases/sybase.py (1)

606-621: Fix SQL injection vulnerability and validation issues

This implementation has multiple critical issues that need immediate attention:

  1. SQL injection vulnerability: The limit parameter is directly interpolated into the query without validation, making it vulnerable to injection attacks.
  2. Inconsistent return type: The function returns list[Tuple] but doesn't match the Postgres implementation signature.
  3. Missing input validation: No validation for empty column names or invalid limit values.

Apply this diff to fix all issues:

-    def fetch_sample_values_from_database(
-        self,
-        table_name: str,
-        column_names: list[str],
-        limit: int = 5,
-    ) -> list[Tuple]:
-        table_name = self.qualified_table_name(table_name)
-        if not column_names:
-            raise ValueError("At least one column name must be provided")
-        columns = ", ".join([self.quote_column(col) for col in column_names])
-        query = f"SELECT TOP {limit} {columns} FROM {table_name}"
-        cursor = self.connection.cursor()
-        cursor.execute(query)
-        rows = cursor.fetchmany(limit)
-        return rows
+    def fetch_sample_values_from_database(
+        self,
+        table_name: str,
+        column_names: list[str],
+        limit: int = 5,
+    ) -> List[Tuple]:
+        table_name = self.qualified_table_name(table_name)
+        
+        if not column_names:
+            raise ValueError("At least one column name must be provided")
+        
+        # Validate and sanitize limit to prevent SQL injection
+        if not isinstance(limit, int) or limit <= 0:
+            raise ValueError("limit must be a positive integer")
+        
+        columns = ", ".join([self.quote_column(col) for col in column_names])
+        query = f"SELECT TOP {limit} {columns} FROM {table_name}"
+        
+        cursor = self.connection.cursor()
+        cursor.execute(query)
+        rows = cursor.fetchmany(limit)
+        return rows

Based on OWASP guidance, prepared statements force a strong distinction between code and data, eliminating SQL injection problems. The current implementation validates the limit parameter to ensure it's a positive integer before interpolation, which aligns with Sybase's TOP clause requirements that the argument must evaluate to a value greater than or equal to 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants